package ink.tsg.flink.temperature;

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;

/**
 * @author tsg
 * @version 1.0
 * @description: TODO
 * @date 2022/5/17 16:20
 */
public class MyKeyedProcessFunction extends KeyedProcessFunction<Integer, SensorRecord, SensorRecord> {

    private int tempDiff;

    public MyKeyedProcessFunction(int tempDiff) {
        this.tempDiff = tempDiff;
    }

    private transient ValueState<Double> lastTemp;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        ValueStateDescriptor<Double> lastTempDescriptor = new ValueStateDescriptor<Double>(
                "last-temp",
                Double.class);

        lastTemp = getRuntimeContext().getState(lastTempDescriptor);
    }

    @Override
    public void processElement(SensorRecord value, Context ctx, Collector<SensorRecord> out) throws Exception {

        //第一条数据，需要处理
        if (lastTemp.value() == null){
            lastTemp.update(Double.MIN_VALUE);
        }
        //从第二条数据开始比较两者的差值
        else if (Math.abs(value.getRecord() - lastTemp.value()) > tempDiff){
            value.setLastRecord(lastTemp.value());
            out.collect(value);
        }

        //value state 记录最新的值
        if (value.getRecord() != null){
            lastTemp.update(value.getRecord());
        }
    }
}
